

package com.hazelcast.jet.impl.connector;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;

import javax.annotation.Nonnull;

public final class UpdateMapWithEntryProcessorP<T, K, V, R> extends AsyncHazelcastWriterP {

    private final IMap<K, V> map;
    private final FunctionEx<? super T, ? extends K> toKeyFn;
    private final FunctionEx<? super T, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn;

    public UpdateMapWithEntryProcessorP(@Nonnull HazelcastInstance instance, int maxParallelAsyncOps, @Nonnull String name, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) {
        super(instance, maxParallelAsyncOps);
        this.map = instance.getMap(name);
        this.toKeyFn = toKeyFn;
        this.toEntryProcessorFn = toEntryProcessorFn;
    }

    @Override
    protected void processInternal(Inbox inbox) {
        int permits = tryAcquirePermits(inbox.size());
        for (Object object; permits > 0 && (object = inbox.peek()) != null; permits--) {
            @SuppressWarnings("unchecked") T item = (T) object;
            EntryProcessor<K, V, R> entryProcessor = toEntryProcessorFn.apply(item);
            K key = toKeyFn.apply(item);
            setCallback(map.submitToKey(key, entryProcessor));
            inbox.remove();
        }
    }
}
